package com.mygame.gateway.server;

import javax.annotation.PostConstruct;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Service;
import com.xinyue.network.message.stream.GatewayMessageHeader;
import com.xinyue.network.message.stream.ServiceMessageHeader;
import com.xinyue.network.message.stream.TransferMessage;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;

/**
 * 
 * @ClassName: GameMessageConsume
 * @Description: 接收业务服务返回的消息，并发送到客户端
 * @author: wgs
 * @date: 2019年5月15日 上午9:17:48
 */
@Service
//监听单独发送到此网关的消息，需要加上此网关的serverId
@RocketMQMessageListener(topic = "${spring.cloud.nacos.discovery.namespace}-XinyueBusinessTopic-${xinyue.game.common.config.local-server-id}", 
consumerGroup = "${spring.cloud.nacos.discovery.namespace}-GatewayConsumeBusinessGroup-${xinyue.game.common.config.local-server-id}")
public class ReceiverGameMessageResponseService implements RocketMQListener<MessageExt> {
	private static Logger logger = LoggerFactory.getLogger(ReceiverGameMessageResponseService.class);
	@Autowired
	private ChannelService channelService;
	@Autowired
	private Environment environment;
	@PostConstruct
	public void init() {
		RocketMQMessageListener rocketMQMessageListener = this.getClass().getAnnotation(RocketMQMessageListener.class);
		if(rocketMQMessageListener == null) {
			logger.error("没有添加消费消息监听");
			return ;
		}
		logger.info("监听业务响应消息topic:{},group:{}",environment.resolveRequiredPlaceholders(rocketMQMessageListener.topic()),environment.resolveRequiredPlaceholders(rocketMQMessageListener.consumerGroup()));
	}

	@Override
	public void onMessage(MessageExt message) {
	        sendMessage(message, channelService);
	}
	
	public static void sendMessage(MessageExt message,ChannelService channelService) {
	    byte[] data = message.getBody();
        if (data != null) {
            try {
                ServiceMessageHeader header = new ServiceMessageHeader();
                ByteBuf byteBuf = Unpooled.wrappedBuffer(data);
                header.read(byteBuf);
                long playerId = header.getPlayerId();
                Channel channel = channelService.getChannel(playerId);// 根据playerId找到这个客户端的连接Channel
                if (channel != null) {
                    GatewayMessageHeader gatewayMessageHeader = new GatewayMessageHeader();
                    //转化为网关包头数据
                    BeanUtils.copyProperties(header, gatewayMessageHeader);
                    ByteBuf bodyBuf = null;
                   if(byteBuf.isReadable()) {
                       byte[] bytes = new byte[byteBuf.readableBytes()];
                       byteBuf.readBytes(bytes);
                       bytes = compress(bytes,gatewayMessageHeader);
                       bytes = encrypt(bytes,gatewayMessageHeader);
                       bodyBuf = Unpooled.wrappedBuffer(bytes);
                   }
                   
                    TransferMessage transferMessage = new TransferMessage(gatewayMessageHeader, bodyBuf);
                    channel.writeAndFlush(transferMessage);// 给客户端返回消息
                }
                logger.debug("网关返回消息：{}",header);
            } catch (Throwable e) {
                logger.error("网关接收消息异常", e);
            }
        }
	}
	private static byte[] compress(byte[] data,GatewayMessageHeader header) {
	    //TODO 在这里对消息进行是否压缩的判断
	    return data;
	}
	private static byte[] encrypt(byte[] data,GatewayMessageHeader header) {
	    //TODO 在这里对消息进行是否加密的判断
	    return data;
	}
}
